Wczytanie bibliotek¶

In [1]:
import geopandas as gpd
import plotly.express as px
from plotly.subplots import make_subplots
import pyproj
from datetime import datetime, timedelta
import happybase as hb
import pandas as pd
import io
from fiona.collection import BytesCollection
import plotly.graph_objects as go

Sesja PySpark¶

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("putting_together").enableHiveSupport().getOrCreate()
23/01/08 12:43:32 WARN Utils: Your hostname, node1 resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
23/01/08 12:43:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/01/08 12:43:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Wczytanie danych pomiarowych o jakości powietrza¶

(z serving layer, dane w Hive)

In [3]:
df = spark.sql("select * from aqicn_readings;")
df = df.dropDuplicates()
df_all = df.toPandas()
yesterday = datetime.now() - timedelta(days=1)
df = df.filter(df.datetime > yesterday).toPandas()
/home/vagrant/project/.venv/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:178: FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead
  series = series.astype(t, copy=False)
/home/vagrant/project/.venv/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:178: FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead
  series = series.astype(t, copy=False)
In [4]:
df_all.head()
Out[4]:
station_id station_name latitude longitude location datetime aqi pm10 pm25
0 365221 Rudzka 52.278000 20.972000 Rudzka, Marymont-Kaskada, Bielany, Warsaw, Mas... 2023-01-04 16:30:48 40 17 40
1 93862 Jana Piekałkiewicza 52.193804 21.051250 Jana Piekałkiewicza, Marcelin, Mokotów, Warsaw... 2023-01-04 16:00:45 57 19 57
2 76228 Graniczna 52.238812 21.002685 Graniczna, IV, Śródmieście, Warsaw, Masovian V... 2023-01-04 15:00:35 0 0 0
3 81664 Motycka 52.275087 21.044866 Motycka 23, 23, Motycka, Targówek Mieszkaniowy... 2023-01-04 15:16:46 57 29 57
4 358753 Koszycka 52.242000 20.944000 Koszycka, Koło-Górczewska, Koło, Wola, Warsaw,... 2023-01-04 16:17:59 26 9 26
In [5]:
df.head()
Out[5]:
station_id station_name latitude longitude location datetime aqi pm10 pm25
0 112837 Pułkownika Mieczysława Niedzielskiego "Żywiciela" 52.268000 20.944000 Pułkownika Mieczysława Niedzielskiego "Żywicie... 2023-01-08 04:28:04 59 23 59
1 77956 Jerzego Beniamina Flatta 52.160000 21.066000 Jerzego Beniamina Flatta, Wilanów Wysoki, Wila... 2023-01-08 06:34:12 31 14 31
2 191677 Encyklopedyczna 52.308000 20.924000 Encyklopedyczna, Młociny, Bielany, Warsaw, Mas... 2023-01-08 08:42:37 31 14 31
3 103360 Bergamotki 52.177887 21.041421 Bergamotki, Green Mokotów, Mokotów, Warsaw, Ma... 2023-01-08 11:41:02 54 19 54
4 238336 Głębocka 52.310000 21.066000 Głębocka, Lewandów, Grodzisk, Białołęka, Warsa... 2023-01-07 20:27:36 61 15 61
In [6]:
df_all.shape, df.shape
Out[6]:
((7296, 9), (2674, 9))

Wczytanie danych mapowych¶

(z serving layer, dane w HBase)

In [7]:
con = hb.Connection("localhost")
con.open()
table = con.table("region_borders")
# do odpowiedniego wczytania shapefile .shp konieczne są wszystkie pliki pomocnicze 
# - wszystkie są w jednej rodzinie kolumn
for ext in ['shp', 'shx', 'dbf', 'prj', 'cpg', 'qpj']:
    binary = table.row(b"dzielnice_Warszawy")[f"map_files:{ext}".encode()]
    with open(f"temp/temp.{ext}", "wb") as f:
        f.write(binary)
In [8]:
data = gpd.read_file("temp/temp.shp")
data = data.to_crs(pyproj.CRS.from_epsg(4326))
data = data.set_index('nazwa_dzie')
In [9]:
data
Out[9]:
geometry
nazwa_dzie
Żoliborz POLYGON ((20.95755 52.26693, 20.95760 52.26712...
Praga-Południe POLYGON ((21.10347 52.22220, 21.10326 52.22206...
Mokotów POLYGON ((21.02160 52.21305, 21.02172 52.21307...
Wola POLYGON ((20.98121 52.25855, 20.98124 52.25838...
Wilanów POLYGON ((21.10500 52.19428, 21.10509 52.19404...
Wesoła POLYGON ((21.19234 52.23658, 21.18920 52.23786...
Wawer POLYGON ((21.10801 52.18913, 21.10798 52.18918...
Włochy POLYGON ((20.92000 52.21420, 20.92019 52.21408...
Ursynów POLYGON ((20.98653 52.16231, 20.98702 52.16252...
Śródmieście POLYGON ((21.06187 52.21720, 21.06172 52.21703...
Praga-Północ POLYGON ((21.00546 52.26812, 21.00542 52.26826...
Ursus POLYGON ((20.87858 52.20955, 20.87822 52.20932...
Targówek POLYGON ((21.06202 52.30838, 21.06205 52.30830...
Rembertów POLYGON ((21.14226 52.27855, 21.14906 52.27935...
Ochota POLYGON ((21.00181 52.22771, 21.00197 52.22747...
Bielany POLYGON ((20.92511 52.32894, 20.92511 52.32846...
Białołęka POLYGON ((21.01270 52.29430, 21.01267 52.29426...
Bemowo POLYGON ((20.88915 52.28046, 20.88917 52.28044...

Preprocessing danych do analiz¶

In [10]:
def extract_disctrict(location):
    for d in data.index:
        if d in location:
            return d
In [11]:
df_unique = df.groupby(['station_id']).agg({"datetime":"max"}).reset_index().merge(df, on=['station_id', 'datetime'], how='left')
df_unique['district'] = df_unique.location.apply(extract_disctrict)
df_all['district'] = df_all.location.apply(extract_disctrict)
In [12]:
data["label"] = "Warszawa"
In [13]:
df_unique['location_short'] = df_unique.apply(lambda row: row['location'].split(f', {row.district}')[0], axis=1)

AQI poszczególnych stacji¶

In [14]:
# background mapowy
fig = px.choropleth(data.reset_index(), locations="nazwa_dzie", geojson=data.geometry, color="label",
                    color_discrete_map={"Warszawa":"lightgrey"},
                    hover_name="nazwa_dzie", hover_data={"label": False, "nazwa_dzie": False})
fig.update_layout(height=800)
fig.update_geos(fitbounds='locations', projection_type='mercator', landcolor = 'rgb(255, 255, 255)')
In [15]:
fig2 = px.scatter_geo(df_unique, lat='latitude', lon='longitude', color='aqi', projection='mercator',
                      hover_name='station_name', 
                      hover_data={'aqi': True, 'location_short': True, 'district': True, 
                                  'latitude': False, 'longitude': False})
fig2.update_geos(fitbounds='locations')
In [16]:
fig3 = fig.add_trace(fig2.data[0])
fig3.update_layout(coloraxis_colorbar={"len":0.5, "title": "AQI"})
fig3.show()

Średnie pomiarów dla dzielnic¶

In [17]:
mean_values = df_unique.groupby(['district'])['aqi', 'pm10', 'pm25'].mean()
/tmp/ipykernel_26587/321958684.py:1: FutureWarning:

Indexing with multiple keys (implicitly converted to a tuple of keys) will be deprecated, use a list instead.

In [18]:
data_with_means = data.merge(mean_values, right_index=True, left_index=True)
In [19]:
data_with_means.reset_index()
Out[19]:
index geometry label aqi pm10 pm25
0 Żoliborz POLYGON ((20.95755 52.26693, 20.95760 52.26712... Warszawa 51.000000 18.000000 51.000000
1 Praga-Południe POLYGON ((21.10347 52.22220, 21.10326 52.22206... Warszawa 81.000000 26.000000 81.000000
2 Mokotów POLYGON ((21.02160 52.21305, 21.02172 52.21307... Warszawa 57.875000 22.000000 57.875000
3 Wola POLYGON ((20.98121 52.25855, 20.98124 52.25838... Warszawa 45.500000 20.000000 45.500000
4 Wilanów POLYGON ((21.10500 52.19428, 21.10509 52.19404... Warszawa 52.000000 24.000000 52.000000
5 Wesoła POLYGON ((21.19234 52.23658, 21.18920 52.23786... Warszawa 56.000000 31.000000 56.000000
6 Wawer POLYGON ((21.10801 52.18913, 21.10798 52.18918... Warszawa 58.833333 23.833333 58.833333
7 Włochy POLYGON ((20.92000 52.21420, 20.92019 52.21408... Warszawa 52.000000 25.500000 52.000000
8 Ursynów POLYGON ((20.98653 52.16231, 20.98702 52.16252... Warszawa 46.333333 16.000000 46.333333
9 Śródmieście POLYGON ((21.06187 52.21720, 21.06172 52.21703... Warszawa 50.500000 21.500000 50.500000
10 Praga-Północ POLYGON ((21.00546 52.26812, 21.00542 52.26826... Warszawa 55.000000 27.000000 55.000000
11 Ursus POLYGON ((20.87858 52.20955, 20.87822 52.20932... Warszawa 53.000000 20.000000 53.000000
12 Targówek POLYGON ((21.06202 52.30838, 21.06205 52.30830... Warszawa 61.000000 27.500000 61.000000
13 Rembertów POLYGON ((21.14226 52.27855, 21.14906 52.27935... Warszawa 25.000000 9.000000 25.000000
14 Ochota POLYGON ((21.00181 52.22771, 21.00197 52.22747... Warszawa 69.000000 27.000000 69.000000
15 Bielany POLYGON ((20.92511 52.32894, 20.92511 52.32846... Warszawa 52.750000 21.500000 52.750000
16 Białołęka POLYGON ((21.01270 52.29430, 21.01267 52.29426... Warszawa 92.166667 45.166667 92.166667
17 Bemowo POLYGON ((20.88915 52.28046, 20.88917 52.28044... Warszawa 59.800000 28.800000 59.800000
In [20]:
fig = px.choropleth(locations=data_with_means.index, geojson=data_with_means.geometry, color=data_with_means.aqi,
                    hover_name=data_with_means.index)
fig.update_layout(height=800, title_text="Mean AQI values")
fig.update_geos(fitbounds='locations', projection_type="mercator", landcolor = "rgb(255, 255, 255)")
fig.update_layout(coloraxis_colorbar={"len":0.5, "title": "AQI"})
fig.show()
In [21]:
fig = px.choropleth(locations=data_with_means.index, geojson=data_with_means.geometry, color=data_with_means.pm25,
                   hover_name=data_with_means.index)
fig.update_layout(height=800, title_text="Mean PM2.5 values")
fig.update_geos(fitbounds='locations', projection_type="mercator", landcolor = "rgb(255, 255, 255)")
fig.update_layout(coloraxis_colorbar={"len":0.5, "title": "PM2.5"})
fig.show()
In [22]:
fig = px.choropleth(locations=data_with_means.index, geojson=data_with_means.geometry, color=data_with_means.pm10,
                   hover_name=data_with_means.index)
fig.update_layout(height=800, title_text="Mean PM10 values")
fig.update_geos(fitbounds='locations', projection_type="mercator", landcolor = "rgb(255, 255, 255)")
fig.update_layout(coloraxis_colorbar={"len":0.5, "title": "PM10"})
fig.show()

Szereg czasowy pomiarów¶

In [23]:
fig = px.line(df.sort_values('datetime'), x='datetime', y='aqi', color='station_name')
fig.show()

Rozkład pomiarów¶

In [24]:
df_all = pd.concat([df_all, df_all.loc[df_all.groupby('station_id')["datetime"].idxmax().to_list()]])
In [25]:
fig = go.Figure()
fig.add_trace(
        go.Violin(
            x=df_all["district"],
            y=df_all["aqi"],
            text=df_all["station_name"],
            points="all",
            box_visible=True,
            meanline_visible=True,
            pointpos=1.5,
            jitter=0.7,
            selectedpoints=df_all.groupby('station_id')["datetime"].idxmax().to_list(),
            selected={"marker_color": "#371ea3", "marker_opacity": 1.0},
            unselected={"marker_opacity": 0.1},
            hovertemplate="<b>%{text}</b><br>AQI: %{y}<extra>%{x}</extra>",
            hoveron="points",
    ))
fig.update_layout(yaxis_title="AQI")
fig.show()
In [ ]: